/*-------------------------------------------------------------------------
 *
 * tcn.c
 *      triggered change notification support for PostgreSQL
 *
 * Portions Copyright (c) 2011-2017, PostgreSQL Global Development Group
 * Portions Copyright (c) 1994, Regents of the University of California
 *
 *
 * IDENTIFICATION
 *      contrib/tcn/tcn.c
 *
 *-------------------------------------------------------------------------
 */

#include "postgres.h"

#include "access/htup_details.h"
#include "executor/spi.h"
#include "commands/async.h"
#include "commands/trigger.h"
#include "lib/stringinfo.h"
#include "utils/rel.h"
#include "utils/syscache.h"

PG_MODULE_MAGIC;

/*
 * Copy from s (for source) to r (for result), wrapping with q (quote)
 * characters and doubling any quote characters found.
 */
static void
strcpy_quoted(StringInfo r, const char *s, const char q)
{
    appendStringInfoCharMacro(r, q);
    while (*s)
    {
        if (*s == q)
            appendStringInfoCharMacro(r, q);
        appendStringInfoCharMacro(r, *s);
        s++;
    }
    appendStringInfoCharMacro(r, q);
}

/*
 * triggered_change_notification
 *
 * This trigger function will send a notification of data modification with
 * primary key values.  The channel will be "tcn" unless the trigger is
 * created with a parameter, in which case that parameter will be used.
 */
PG_FUNCTION_INFO_V1(triggered_change_notification);

Datum
triggered_change_notification(PG_FUNCTION_ARGS)
{
    TriggerData *trigdata = (TriggerData *) fcinfo->context;
    Trigger    *trigger;
    int            nargs;
    HeapTuple    trigtuple;
    Relation    rel;
    TupleDesc    tupdesc;
    char       *channel;
    char        operation;
    StringInfo    payload = makeStringInfo();
    bool        foundPK;

    List       *indexoidlist;
    ListCell   *indexoidscan;

    /* make sure it's called as a trigger */
    if (!CALLED_AS_TRIGGER(fcinfo))
        ereport(ERROR,
                (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED),
                 errmsg("triggered_change_notification: must be called as trigger")));

    /* and that it's called after the change */
    if (!TRIGGER_FIRED_AFTER(trigdata->tg_event))
        ereport(ERROR,
                (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED),
                 errmsg("triggered_change_notification: must be called after the change")));

    /* and that it's called for each row */
    if (!TRIGGER_FIRED_FOR_ROW(trigdata->tg_event))
        ereport(ERROR,
                (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED),
                 errmsg("triggered_change_notification: must be called for each row")));

    if (TRIGGER_FIRED_BY_INSERT(trigdata->tg_event))
        operation = 'I';
    else if (TRIGGER_FIRED_BY_UPDATE(trigdata->tg_event))
        operation = 'U';
    else if (TRIGGER_FIRED_BY_DELETE(trigdata->tg_event))
        operation = 'D';
    else
    {
        elog(ERROR, "triggered_change_notification: trigger fired by unrecognized operation");
        operation = 'X';        /* silence compiler warning */
    }

    trigger = trigdata->tg_trigger;
    nargs = trigger->tgnargs;
    if (nargs > 1)
        ereport(ERROR,
                (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED),
                 errmsg("triggered_change_notification: must not be called with more than one parameter")));

    if (nargs == 0)
        channel = "tcn";
    else
        channel = trigger->tgargs[0];

    /* get tuple data */
    trigtuple = trigdata->tg_trigtuple;
    rel = trigdata->tg_relation;
    tupdesc = rel->rd_att;

    foundPK = false;

    /*
     * Get the list of index OIDs for the table from the relcache, and look up
     * each one in the pg_index syscache until we find one marked primary key
     * (hopefully there isn't more than one such).
     */
    indexoidlist = RelationGetIndexList(rel);

    foreach(indexoidscan, indexoidlist)
    {
        Oid            indexoid = lfirst_oid(indexoidscan);
        HeapTuple    indexTuple;
        Form_pg_index index;

        indexTuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(indexoid));
        if (!HeapTupleIsValid(indexTuple))    /* should not happen */
            elog(ERROR, "cache lookup failed for index %u", indexoid);
        index = (Form_pg_index) GETSTRUCT(indexTuple);
        /* we're only interested if it is the primary key and valid */
        if (index->indisprimary && IndexIsValid(index))
        {
            int            numatts = index->indnatts;

            if (numatts > 0)
            {
                int            i;

                foundPK = true;

                strcpy_quoted(payload, RelationGetRelationName(rel), '"');
                appendStringInfoCharMacro(payload, ',');
                appendStringInfoCharMacro(payload, operation);

                for (i = 0; i < numatts; i++)
                {
                    int            colno = index->indkey.values[i];

                    appendStringInfoCharMacro(payload, ',');
                    strcpy_quoted(payload, NameStr((tupdesc->attrs[colno - 1])->attname), '"');
                    appendStringInfoCharMacro(payload, '=');
                    strcpy_quoted(payload, SPI_getvalue(trigtuple, tupdesc, colno), '\'');
                }

                Async_Notify(channel, payload->data);
            }
            ReleaseSysCache(indexTuple);
            break;
        }
        ReleaseSysCache(indexTuple);
    }

    list_free(indexoidlist);

    if (!foundPK)
        ereport(ERROR,
                (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED),
                 errmsg("triggered_change_notification: must be called on a table with a primary key")));

    return PointerGetDatum(NULL);    /* after trigger; value doesn't matter */
}
